iT邦幫忙

2021 iThome 鐵人賽

DAY 16
1

在上一篇文章我們建立 Channel 時,使用 Channel<E>() 來建立一個 Channel。這個方法是由 Coroutine 所提供的一個用來建立 Channel 的函式。我們來看一下這個函式的簽名:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

在這邊有三個參數,capacity、onBufferOverflow、onUndeliveredElement,當我們沒有帶任何參數進來的時候,預設是會使用

capacity: Int = RENDEZVOUS

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

onUndeliveredElement: ((E) -> Unit)? = null

依據帶入的參數不同,Channel 的特性也不一樣,下面將列出所有的參數:


Capacity

這邊的 Capacity 指的是緩衝區的容量,Channel 提供了五種 capacity:

  • RENDEZVOUS
  • CONFLATED
  • UNLIMITED
  • BUFFERED
  • 其他

RENDEZVOUS Channel

預設的 Capacity,這是一種沒有 Buffer 的 channel,經由 channel 傳送的元素會在 send() 以及 receive() 同時呼叫時,才會傳送過去。當 receive() 調用時, send() 就會 suspend 。反之,當 send() 調用時, receive() 就會 suspend 。換句話說, send() 與 receive() 是一組一組成雙成對的。

另外,假設調用 send() 但是 receive() 卻沒有調用,這時候這個 suspend 函式就不會被調用,反之,如果只有調用 receive() 沒有調用 send() ,這時候因為 channel 是空的, 調用 receive() 沒有內容,所以 receive() 就會進入 suspend 的狀態,直到有一個 send() 被調用。

RENDEZVOUS -> {
      if (onBufferOverflow == BufferOverflow.SUSPEND)
          RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
      else
          ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
  }

Channel() 這個 Builder 裏面的內容來看,我們發現,使用 RENDEZVOUS 時,必須要將 onBufferOverflow 設定為 BufferOverflow.SUSPEND ,此時才是會建立 RendezvousChannel() ,否則就會建立一個 ArrayChannel() 且 capacity 為 1。(Channel() 預設的 onBufferOverflow 的值就是設定為 BufferOverflow.SUSPEND。)

CONFLATED Channel

Conflated channel (合併的通道),它在使用上並不是將所有的元素都合併起來,而是只保留最後一個元素,換句話說,在這個 channel 中,它的 Buffer 只有一個,而這個 Buffer 只會把最新的元素保留下來,在下一個 send() 調用之前如果沒有調用 receive() ,那麼在 Buffer 裏面的元素將會被丟掉。

看下面的範例:

class Day16 {
    val scope = CoroutineScope(Job())

    suspend fun conflatedBroadcastChannel(): Unit {
        val channel = Channel<Int>(capacity = Channel.CONFLATED)
        scope.launch {
            for(i in 0..10){
                channel.send(i)
            }
            
            delay(1000)
            channel.send(9)
        }
        
        for(i in 0 .. 100){
            delay(100)
            println(channel.receive())
        }
        
        println("done")
    }
}

→ 我們在 scope.launch{ ... } 中發送 Int 至 channel 中,其中發送的部分有兩段,一段是在 channel 中直接塞入0~10,接著過了一秒鐘,另外調用 channel.send(9) 將整數 9 傳進 channel 中。

→ 在 scope.launch{} 外側則是使用一個 for-loop 來取用 channel 裏面的值,我們猜猜看會是怎麼樣的結果呢?

10
9

如我們所說的, Conflated channel 只會保留最後的元素,直到有另外的 receive() 來把值取走。所以上面範例的第一段 channel.send() 因為發送的時間比取值的時間還要快,所以所有的值都以會新的蓋掉舊的行為進行,應為第一個 channel.receive() 呼叫時,已經過了 100 毫秒。

我在這邊刻意將含有 channel.receive() 的 for-loop 時間拉長,為了就是要包含到下一個 send() 的時間,以上面這個範例來說, 第一段的 channel.send() 做完之後,延遲了 1 秒鐘,接著會調用 channel.send(9) 把整數 9 送出,這個行為剛好會在 channel.receive() 這個 for-loop 的範圍內,所以 9 也會被列印出來。

之後呢?因為我們使用 for-loop 嘗試調用 channel.receive() 取出 100 個元素,但是我們只有成功取出兩個元素,所以這個 receive() 就會持續等待,直到所有的 receive() 都取出值。

  • 有趣的點:
CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedChannel(onUndeliveredElement)
        }

使用 Channel() 時,會要求你的 onBufferOverflow 要為 BufferOverflow.SUSPEND ,但是進入 CONFLATED 的定義去看,又會發現

Requests a conflated channel in the Channel(...) factory function. This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.

其實它是建立一個使用 onBufferOverflow = DROP_OLDEST 的 channel。


UNLIMITED channel

Unlimited channel 應該比較好懂一點,從名字看起來就知道它的 Buffer 是無限制的。

UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows

從 Channel() 得知,UNLIMITED 會建立 LinkedListChannel ,LinkedList 我想應該大家都知道意思,它就是一個有順序性的列表。那麼 LinkedListChannel 就是一個有順序性的列表含有一個無限制的 Buffer。它的實作如下:

internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
    protected final override val isBufferAlwaysEmpty: Boolean get() = true
    protected final override val isBufferEmpty: Boolean get() = true
    protected final override val isBufferAlwaysFull: Boolean get() = false
    protected final override val isBufferFull: Boolean get() = false
...

可以發現 isBufferAlwaysFull 以及 isBufferFull 都是回傳 false,也就是都不會滿,那麼也就不會走到 onBufferOverflow 的情況了。(不過這邊的無限制的 unlimited 的 buffer 最終還是需要看記憶體夠不夠放)

class Day16 {
    val scope = CoroutineScope(Job())
    suspend fun conflatedChannel(): Unit {
        val channel = Channel<Int>(capacity = Channel.UNLIMITED)
        scope.launch {
            for (i in 0..10) {
                channel.send(threeTimesInt(i))
            }
        }

        for (i in 0..10) {
            println(channel.receive())
        }
        println("done")
    }

    private suspend fun threeTimesInt(x: Int): Int {
        delay(100L * x)
        return x * 3
    }
}
0
3
6
9
12
15
18
21
24
27
30
done

我在使用 UNLIMITED 以及預設的 RENDEZVOUS 時感覺很相像,但是差異在於 RENDEZVOUS 所產生出來的 Channel 是由 RendezvousChannel(onUndeliveredElement) 所建立的。而RendezvousChannel(onUndeliveredElement) 的實作如下:

internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?): AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get()= true
    protected final override val isBufferEmpty: Boolean get()= true
    protected final override val isBufferAlwaysFull: Boolean get()= true
    protected final override val isBufferFull: Boolean get()= true
}

我們可以發現,isBufferAlwaysFull 以及 isBufferFull 都是回傳 true,所以預設的 RENDEZVOUS channel 是會走到 onBufferOverflow 的情況,與 UNLIMITED channel 不同。

在 UNLIMITED channel 下調用 send() 是永遠不會被 suspend 的,而 RENDEZVOUS channel 是會 suspend。我們假設將上面這段程式稍作修改

class Day16 {
    val scope = CoroutineScope(Job())
    suspend fun conflatedChannel(): Unit {
        val channel = Channel<Int>(capacity = Channel.UNLIMITED)
        scope.launch {
            for (i in 0..10) {
								println("first: $i")
                channel.send(threeTimesInt(i))
            }
        }

				scope.launch {
            for (i in 0..10) {
                println("second: $i")
                channel.send(fiveTimesInt(i))
            }
        }

				delay(100)
        for (i in 0..20) {
            println(channel.receive())
        }

        println("done")
    }

    private suspend fun threeTimesInt(x: Int): Int {
        return x * 3
    }
		private suspend fun fiveTimesInt(x: Int): Int {
        return x * 5
    }

}
// capacity = Channel.RENDEZVOUS
first: 0
second: 0
0
first: 1
0
second: 1
first: 2
3
5
second: 2
first: 3
6
10
second: 3
first: 4
9
15
12
second: 4
first: 5
second: 5
20
15
first: 6
second: 6
25
18
30
first: 7
second: 7
first: 8
21
35
second: 8
first: 9
24
40
second: 9
first: 10
27
45
second: 10
30
done

→ 因為 send() 是會 suspend 的,所以會在不同的 coroutine scope 中切換。

如果使用的是 UNLIMITED channel ,其結果會是:

first: 0
first: 1
first: 2
first: 3
first: 4
first: 5
first: 6
first: 7
first: 8
first: 9
first: 10
second: 0
second: 1
second: 2
second: 3
second: 4
second: 5
second: 6
second: 7
second: 8
second: 9
second: 10
0
3
6
9
12
15
18
21
24
27
30
0
5
10
15
20
25
30
35
40
45
done

→ 因為 Buffer 有無限的空間,所以可以存完所有資料之後才列印出來。


最後則是 Buffer 的容量不為 0 也不是無限的

在前面我們知道,當 Buffer 的容量為 0 時(RENDEZVOUS),每一次的調用 send() ,send() 都會 suspend ,也就是說會把執行緒的使用權切換走,直到調用 receive()

在 Buffer 不為 0 的 channel 中, send() 只會在 channel 滿的時候才會 suspend,也就是說會根據我們所設置的大小來決定什麼時機點會 suspend send()。同樣地,receive() 也是會在 buffer 為空的時候會 suspend 。

在 Channel() 中,我們注意到當 capacity ==1 且 onBufferOverflow == BufferOverflow.DROP_OLDEST 時,我們就會建立 ConflatedChannel() ,否則就會建立 ArrayChannel()

if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
          ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
      else
          ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)

在 ArrayChannel 的裏面,會根據帶入的 capacity 與現在的狀態來比對 buffer 是否為滿的狀態。

internal open class ArrayChannel<E>(
    /**
     * Buffer capacity.
     */
    private val capacity: Int,
    private val onBufferOverflow: BufferOverflow,
    onUndeliveredElement: OnUndeliveredElement<E>?
){
...
		protected final override val isBufferAlwaysEmpty: Boolean get() = false
    protected final override val isBufferEmpty: Boolean get() = state.size == 0
    protected final override val isBufferAlwaysFull: Boolean get() = false
    protected final override val isBufferFull: Boolean get() = state.size == capacity
...

小結

有四種 Channel ,我們可以依照需求來選擇使用,特別要注意的是, receive() 的行為是固定的,receive() 如果在 channel 為空的時候調用,那麼 receive() 就會 suspend 直到 channel 不為空。

而 send() 則是會根據不同 channel 的 buffer 容量大小來決定何時會 suspend。UNLIMITED 因為具有無限大的 buffer 大小,所以 send() 不會 suspend()。

預設是使用 RENDEZVOUS Channel ,也就是 buffer 的 容量為 0,所以 send() 與 receive() 必須要成雙成對的呼叫,否則就會 suspend。

參考資料

[Kotlin Coroutine Channel] (https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html)

特別感謝

Kotlin Taiwan User Group
Kotlin 讀書會


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day15:Channel 的第一堂課
下一篇
Day17:Flow,一個非同步的資料流。 First Look
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言